1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.sun.jmx.remote.internal;
27
28 import java.security.AccessController;
29 import java.security.PrivilegedAction;
30 import java.security.PrivilegedActionException;
31 import java.security.PrivilegedExceptionAction;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Set;
38 import java.util.HashMap;
39 import java.util.Map;
40
41 import javax.management.InstanceNotFoundException;
42 import javax.management.MBeanServer;
43 import javax.management.MBeanServerDelegate;
44 import javax.management.MBeanServerNotification;
45 import javax.management.Notification;
46 import javax.management.NotificationBroadcaster;
47 import javax.management.NotificationFilter;
48 import javax.management.NotificationFilterSupport;
49 import javax.management.NotificationListener;
50 import javax.management.ObjectName;
51 import javax.management.QueryEval;
52 import javax.management.QueryExp;
53
54 import javax.management.remote.NotificationResult;
55 import javax.management.remote.TargetedNotification;
56
57 import com.sun.jmx.remote.util.EnvHelp;
58 import com.sun.jmx.remote.util.ClassLogger;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 public class ArrayNotificationBuffer implements NotificationBuffer {
112 private boolean disposed = false;
113
114
115
116 private static final Object globalLock = new Object();
117 private static final
118 HashMap<MBeanServer,ArrayNotificationBuffer> mbsToBuffer =
119 new HashMap<MBeanServer,ArrayNotificationBuffer>(1);
120 private final Collection<ShareBuffer> sharers = new HashSet<ShareBuffer>(1);
121
122 public static NotificationBuffer getNotificationBuffer(
123 MBeanServer mbs, Map<String, ?> env) {
124
125 if (env == null)
126 env = Collections.emptyMap();
127
128
129 int queueSize = EnvHelp.getNotifBufferSize(env);
130
131 ArrayNotificationBuffer buf;
132 boolean create;
133 NotificationBuffer sharer;
134 synchronized (globalLock) {
135 buf = mbsToBuffer.get(mbs);
136 create = (buf == null);
137 if (create) {
138 buf = new ArrayNotificationBuffer(mbs, queueSize);
139 mbsToBuffer.put(mbs, buf);
140 }
141 sharer = buf.new ShareBuffer(queueSize);
142 }
143
144
145
146
147
148
149
150
151
152 if (create)
153 buf.createListeners();
154 return sharer;
155 }
156
157
158
159
160
161 static void removeNotificationBuffer(MBeanServer mbs) {
162 synchronized (globalLock) {
163 mbsToBuffer.remove(mbs);
164 }
165 }
166
167 void addSharer(ShareBuffer sharer) {
168 synchronized (globalLock) {
169 synchronized (this) {
170 if (sharer.getSize() > queueSize)
171 resize(sharer.getSize());
172 }
173 sharers.add(sharer);
174 }
175 }
176
177 private void removeSharer(ShareBuffer sharer) {
178 boolean empty;
179 synchronized (globalLock) {
180 sharers.remove(sharer);
181 empty = sharers.isEmpty();
182 if (empty)
183 removeNotificationBuffer(mBeanServer);
184 else {
185 int max = 0;
186 for (ShareBuffer buf : sharers) {
187 int bufsize = buf.getSize();
188 if (bufsize > max)
189 max = bufsize;
190 }
191 if (max < queueSize)
192 resize(max);
193 }
194 }
195 if (empty) {
196 synchronized (this) {
197 disposed = true;
198
199 notifyAll();
200 }
201 destroyListeners();
202 }
203 }
204
205 private synchronized void resize(int newSize) {
206 if (newSize == queueSize)
207 return;
208 while (queue.size() > newSize)
209 dropNotification();
210 queue.resize(newSize);
211 queueSize = newSize;
212 }
213
214 private class ShareBuffer implements NotificationBuffer {
215 ShareBuffer(int size) {
216 this.size = size;
217 addSharer(this);
218 }
219
220 public NotificationResult
221 fetchNotifications(NotificationBufferFilter filter,
222 long startSequenceNumber,
223 long timeout,
224 int maxNotifications)
225 throws InterruptedException {
226 NotificationBuffer buf = ArrayNotificationBuffer.this;
227 return buf.fetchNotifications(filter, startSequenceNumber,
228 timeout, maxNotifications);
229 }
230
231 public void dispose() {
232 ArrayNotificationBuffer.this.removeSharer(this);
233 }
234
235 int getSize() {
236 return size;
237 }
238
239 private final int size;
240 }
241
242
243
244
245 private ArrayNotificationBuffer(MBeanServer mbs, int queueSize) {
246 if (logger.traceOn())
247 logger.trace("Constructor", "queueSize=" + queueSize);
248
249 if (mbs == null || queueSize < 1)
250 throw new IllegalArgumentException("Bad args");
251
252 this.mBeanServer = mbs;
253 this.queueSize = queueSize;
254 this.queue = new ArrayQueue<NamedNotification>(queueSize);
255 this.earliestSequenceNumber = System.currentTimeMillis();
256 this.nextSequenceNumber = this.earliestSequenceNumber;
257
258 logger.trace("Constructor", "ends");
259 }
260
261 private synchronized boolean isDisposed() {
262 return disposed;
263 }
264
265
266
267
268 public void dispose() {
269 throw new UnsupportedOperationException();
270 }
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299 public NotificationResult
300 fetchNotifications(NotificationBufferFilter filter,
301 long startSequenceNumber,
302 long timeout,
303 int maxNotifications)
304 throws InterruptedException {
305
306 logger.trace("fetchNotifications", "starts");
307
308 if (startSequenceNumber < 0 || isDisposed()) {
309 synchronized(this) {
310 return new NotificationResult(earliestSequenceNumber(),
311 nextSequenceNumber(),
312 new TargetedNotification[0]);
313 }
314 }
315
316
317 if (filter == null
318 || startSequenceNumber < 0 || timeout < 0
319 || maxNotifications < 0) {
320 logger.trace("fetchNotifications", "Bad args");
321 throw new IllegalArgumentException("Bad args to fetch");
322 }
323
324 if (logger.debugOn()) {
325 logger.trace("fetchNotifications",
326 "filter=" + filter + "; startSeq=" +
327 startSequenceNumber + "; timeout=" + timeout +
328 "; max=" + maxNotifications);
329 }
330
331 if (startSequenceNumber > nextSequenceNumber()) {
332 final String msg = "Start sequence number too big: " +
333 startSequenceNumber + " > " + nextSequenceNumber();
334 logger.trace("fetchNotifications", msg);
335 throw new IllegalArgumentException(msg);
336 }
337
338
339
340
341
342
343 long endTime = System.currentTimeMillis() + timeout;
344 if (endTime < 0)
345 endTime = Long.MAX_VALUE;
346
347 if (logger.debugOn())
348 logger.debug("fetchNotifications", "endTime=" + endTime);
349
350
351
352
353
354 long earliestSeq = -1;
355 long nextSeq = startSequenceNumber;
356 List<TargetedNotification> notifs =
357 new ArrayList<TargetedNotification>();
358
359
360
361 while (true) {
362 logger.debug("fetchNotifications", "main loop starts");
363
364 NamedNotification candidate;
365
366
367
368 synchronized (this) {
369
370
371
372 if (earliestSeq < 0) {
373 earliestSeq = earliestSequenceNumber();
374 if (logger.debugOn()) {
375 logger.debug("fetchNotifications",
376 "earliestSeq=" + earliestSeq);
377 }
378 if (nextSeq < earliestSeq) {
379 nextSeq = earliestSeq;
380 logger.debug("fetchNotifications",
381 "nextSeq=earliestSeq");
382 }
383 } else
384 earliestSeq = earliestSequenceNumber();
385
386
387
388
389
390
391 if (nextSeq < earliestSeq) {
392 logger.trace("fetchNotifications",
393 "nextSeq=" + nextSeq + " < " + "earliestSeq=" +
394 earliestSeq + " so may have lost notifs");
395 break;
396 }
397
398 if (nextSeq < nextSequenceNumber()) {
399 candidate = notificationAt(nextSeq);
400 if (logger.debugOn()) {
401 logger.debug("fetchNotifications", "candidate: " +
402 candidate);
403 logger.debug("fetchNotifications", "nextSeq now " +
404 nextSeq);
405 }
406 } else {
407
408
409
410
411 if (notifs.size() > 0) {
412 logger.debug("fetchNotifications",
413 "no more notifs but have some so don't wait");
414 break;
415 }
416 long toWait = endTime - System.currentTimeMillis();
417 if (toWait <= 0) {
418 logger.debug("fetchNotifications", "timeout");
419 break;
420 }
421
422
423 if (isDisposed()) {
424 if (logger.debugOn())
425 logger.debug("fetchNotifications",
426 "dispose callled, no wait");
427 return new NotificationResult(earliestSequenceNumber(),
428 nextSequenceNumber(),
429 new TargetedNotification[0]);
430 }
431
432 if (logger.debugOn())
433 logger.debug("fetchNotifications",
434 "wait(" + toWait + ")");
435 wait(toWait);
436
437 continue;
438 }
439 }
440
441
442
443
444
445
446 ObjectName name = candidate.getObjectName();
447 Notification notif = candidate.getNotification();
448 List<TargetedNotification> matchedNotifs =
449 new ArrayList<TargetedNotification>();
450 logger.debug("fetchNotifications",
451 "applying filter to candidate");
452 filter.apply(matchedNotifs, name, notif);
453
454 if (matchedNotifs.size() > 0) {
455
456
457
458
459
460 if (maxNotifications <= 0) {
461 logger.debug("fetchNotifications",
462 "reached maxNotifications");
463 break;
464 }
465 --maxNotifications;
466 if (logger.debugOn())
467 logger.debug("fetchNotifications", "add: " +
468 matchedNotifs);
469 notifs.addAll(matchedNotifs);
470 }
471
472 ++nextSeq;
473 }
474
475
476 int nnotifs = notifs.size();
477 TargetedNotification[] resultNotifs =
478 new TargetedNotification[nnotifs];
479 notifs.toArray(resultNotifs);
480 NotificationResult nr =
481 new NotificationResult(earliestSeq, nextSeq, resultNotifs);
482 if (logger.debugOn())
483 logger.debug("fetchNotifications", nr.toString());
484 logger.trace("fetchNotifications", "ends");
485
486 return nr;
487 }
488
489 synchronized long earliestSequenceNumber() {
490 return earliestSequenceNumber;
491 }
492
493 synchronized long nextSequenceNumber() {
494 return nextSequenceNumber;
495 }
496
497 synchronized void addNotification(NamedNotification notif) {
498 if (logger.traceOn())
499 logger.trace("addNotification", notif.toString());
500
501 while (queue.size() >= queueSize) {
502 dropNotification();
503 if (logger.debugOn()) {
504 logger.debug("addNotification",
505 "dropped oldest notif, earliestSeq=" +
506 earliestSequenceNumber);
507 }
508 }
509 queue.add(notif);
510 nextSequenceNumber++;
511 if (logger.debugOn())
512 logger.debug("addNotification", "nextSeq=" + nextSequenceNumber);
513 notifyAll();
514 }
515
516 private void dropNotification() {
517 queue.remove(0);
518 earliestSequenceNumber++;
519 }
520
521 synchronized NamedNotification notificationAt(long seqNo) {
522 long index = seqNo - earliestSequenceNumber;
523 if (index < 0 || index > Integer.MAX_VALUE) {
524 final String msg = "Bad sequence number: " + seqNo + " (earliest "
525 + earliestSequenceNumber + ")";
526 logger.trace("notificationAt", msg);
527 throw new IllegalArgumentException(msg);
528 }
529 return queue.get((int) index);
530 }
531
532 private static class NamedNotification {
533 NamedNotification(ObjectName sender, Notification notif) {
534 this.sender = sender;
535 this.notification = notif;
536 }
537
538 ObjectName getObjectName() {
539 return sender;
540 }
541
542 Notification getNotification() {
543 return notification;
544 }
545
546 public String toString() {
547 return "NamedNotification(" + sender + ", " + notification + ")";
548 }
549
550 private final ObjectName sender;
551 private final Notification notification;
552 }
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586 private void createListeners() {
587 logger.debug("createListeners", "starts");
588
589 synchronized (this) {
590 createdDuringQuery = new HashSet<ObjectName>();
591 }
592
593 try {
594 addNotificationListener(MBeanServerDelegate.DELEGATE_NAME,
595 creationListener, creationFilter, null);
596 logger.debug("createListeners", "added creationListener");
597 } catch (Exception e) {
598 final String msg = "Can't add listener to MBean server delegate: ";
599 RuntimeException re = new IllegalArgumentException(msg + e);
600 EnvHelp.initCause(re, e);
601 logger.fine("createListeners", msg + e);
602 logger.debug("createListeners", e);
603 throw re;
604 }
605
606
607
608 Set<ObjectName> names = queryNames(null, broadcasterQuery);
609 names = new HashSet<ObjectName>(names);
610
611 synchronized (this) {
612 names.addAll(createdDuringQuery);
613 createdDuringQuery = null;
614 }
615
616 for (ObjectName name : names)
617 addBufferListener(name);
618 logger.debug("createListeners", "ends");
619 }
620
621 private void addBufferListener(ObjectName name) {
622 checkNoLocks();
623 if (logger.debugOn())
624 logger.debug("addBufferListener", name.toString());
625 try {
626 addNotificationListener(name, bufferListener, null, name);
627 } catch (Exception e) {
628 logger.trace("addBufferListener", e);
629
630
631
632 }
633 }
634
635 private void removeBufferListener(ObjectName name) {
636 checkNoLocks();
637 if (logger.debugOn())
638 logger.debug("removeBufferListener", name.toString());
639 try {
640 removeNotificationListener(name, bufferListener);
641 } catch (Exception e) {
642 logger.trace("removeBufferListener", e);
643 }
644 }
645
646 private void addNotificationListener(final ObjectName name,
647 final NotificationListener listener,
648 final NotificationFilter filter,
649 final Object handback)
650 throws Exception {
651 try {
652 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
653 public Void run() throws InstanceNotFoundException {
654 mBeanServer.addNotificationListener(name,
655 listener,
656 filter,
657 handback);
658 return null;
659 }
660 });
661 } catch (Exception e) {
662 throw extractException(e);
663 }
664 }
665
666 private void removeNotificationListener(final ObjectName name,
667 final NotificationListener listener)
668 throws Exception {
669 try {
670 AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
671 public Void run() throws Exception {
672 mBeanServer.removeNotificationListener(name, listener);
673 return null;
674 }
675 });
676 } catch (Exception e) {
677 throw extractException(e);
678 }
679 }
680
681 private Set<ObjectName> queryNames(final ObjectName name,
682 final QueryExp query) {
683 PrivilegedAction<Set<ObjectName>> act =
684 new PrivilegedAction<Set<ObjectName>>() {
685 public Set<ObjectName> run() {
686 return mBeanServer.queryNames(name, query);
687 }
688 };
689 try {
690 return AccessController.doPrivileged(act);
691 } catch (RuntimeException e) {
692 logger.fine("queryNames", "Failed to query names: " + e);
693 logger.debug("queryNames", e);
694 throw e;
695 }
696 }
697
698 private static boolean isInstanceOf(final MBeanServer mbs,
699 final ObjectName name,
700 final String className) {
701 PrivilegedExceptionAction<Boolean> act =
702 new PrivilegedExceptionAction<Boolean>() {
703 public Boolean run() throws InstanceNotFoundException {
704 return mbs.isInstanceOf(name, className);
705 }
706 };
707 try {
708 return AccessController.doPrivileged(act);
709 } catch (Exception e) {
710 logger.fine("isInstanceOf", "failed: " + e);
711 logger.debug("isInstanceOf", e);
712 return false;
713 }
714 }
715
716
717
718
719
720
721
722
723
724 private void createdNotification(MBeanServerNotification n) {
725 final String shouldEqual =
726 MBeanServerNotification.REGISTRATION_NOTIFICATION;
727 if (!n.getType().equals(shouldEqual)) {
728 logger.warning("createNotification", "bad type: " + n.getType());
729 return;
730 }
731
732 ObjectName name = n.getMBeanName();
733 if (logger.debugOn())
734 logger.debug("createdNotification", "for: " + name);
735
736 synchronized (this) {
737 if (createdDuringQuery != null) {
738 createdDuringQuery.add(name);
739 return;
740 }
741 }
742
743 if (isInstanceOf(mBeanServer, name, broadcasterClass)) {
744 addBufferListener(name);
745 if (isDisposed())
746 removeBufferListener(name);
747 }
748 }
749
750 private class BufferListener implements NotificationListener {
751 public void handleNotification(Notification notif, Object handback) {
752 if (logger.debugOn()) {
753 logger.debug("BufferListener.handleNotification",
754 "notif=" + notif + "; handback=" + handback);
755 }
756 ObjectName name = (ObjectName) handback;
757 addNotification(new NamedNotification(name, notif));
758 }
759 }
760
761 private final NotificationListener bufferListener = new BufferListener();
762
763 private static class BroadcasterQuery
764 extends QueryEval implements QueryExp {
765 private static final long serialVersionUID = 7378487660587592048L;
766
767 public boolean apply(final ObjectName name) {
768 final MBeanServer mbs = QueryEval.getMBeanServer();
769 return isInstanceOf(mbs, name, broadcasterClass);
770 }
771 }
772 private static final QueryExp broadcasterQuery = new BroadcasterQuery();
773
774 private static final NotificationFilter creationFilter;
775 static {
776 NotificationFilterSupport nfs = new NotificationFilterSupport();
777 nfs.enableType(MBeanServerNotification.REGISTRATION_NOTIFICATION);
778 creationFilter = nfs;
779 }
780
781 private final NotificationListener creationListener =
782 new NotificationListener() {
783 public void handleNotification(Notification notif,
784 Object handback) {
785 logger.debug("creationListener", "handleNotification called");
786 createdNotification((MBeanServerNotification) notif);
787 }
788 };
789
790 private void destroyListeners() {
791 checkNoLocks();
792 logger.debug("destroyListeners", "starts");
793 try {
794 removeNotificationListener(MBeanServerDelegate.DELEGATE_NAME,
795 creationListener);
796 } catch (Exception e) {
797 logger.warning("remove listener from MBeanServer delegate", e);
798 }
799 Set<ObjectName> names = queryNames(null, broadcasterQuery);
800 for (final ObjectName name : names) {
801 if (logger.debugOn())
802 logger.debug("destroyListeners",
803 "remove listener from " + name);
804 removeBufferListener(name);
805 }
806 logger.debug("destroyListeners", "ends");
807 }
808
809 private void checkNoLocks() {
810 if (Thread.holdsLock(this) || Thread.holdsLock(globalLock))
811 logger.warning("checkNoLocks", "lock protocol violation");
812 }
813
814
815
816
817
818 private static Exception extractException(Exception e) {
819 while (e instanceof PrivilegedActionException) {
820 e = ((PrivilegedActionException)e).getException();
821 }
822 return e;
823 }
824
825 private static final ClassLogger logger =
826 new ClassLogger("javax.management.remote.misc",
827 "ArrayNotificationBuffer");
828
829 private final MBeanServer mBeanServer;
830 private final ArrayQueue<NamedNotification> queue;
831 private int queueSize;
832 private long earliestSequenceNumber;
833 private long nextSequenceNumber;
834 private Set<ObjectName> createdDuringQuery;
835
836 static final String broadcasterClass =
837 NotificationBroadcaster.class.getName();
838 }